跳到主要内容

gRPC 的 UnknownServiceHandler

今天在看 b 站的 powermock 源码时,发现它 Mock gRPC 是通过一个叫做 grpc.UnknownServiceHandler 的方法来做 服务端的 Mock

使用例:

server := grpc.NewServer(grpc.UnknownServiceHandler(s.handleStream))
listener, err := net.Listen("tcp", s.cfg.Address)
err = server.Serve(listener)
if err != nil {
server.GracefulStop()
}

// ....

func (s *MockServer) handleStream(srv interface{}, stream grpc.ServerStream) error {
fullMethodName, ok := grpc.MethodFromServerStream(stream)
if !ok {
return status.Errorf(codes.Internal, "lowLevelServerStream not exists in context")
}
md, _ := metadata.FromIncomingContext(stream.Context())

s.LogInfo(map[string]interface{}{
"path": fullMethodName,
"metadata": md,
}, "request received")

method, ok := s.protoManager.GetMethod(fullMethodName)
if !ok {
return status.Errorf(codes.NotFound, "method not found")
}

request := dynamic.NewMessage(method.GetInputType())
if err := stream.RecvMsg(request); err != nil {
return status.Errorf(codes.Unknown, "failed to recv request")
}
data, err := request.MarshalJSONPB(&jsonpb.Marshaler{})
if err != nil {
return status.Errorf(codes.Unknown, "failed to marshal request")
}

// 自己构建一个响应数据
response, err := // ...

if err != nil {
return err
}
stream.SetTrailer(metadata.New(response.Trailer))
if len(response.Header) > 0 {
if err := stream.SetHeader(metadata.New(response.Header)); err != nil {
return status.Errorf(codes.Unavailable, "failed to set header: %s", err)
}
}
if response.Code != 0 {
return status.Errorf(codes.Code(response.Code), "expected code is: %d", response.Code)
}
if err := stream.SendMsg(response.Body); err != nil {
return status.Errorf(codes.Internal, "failed to send message: %s", err)
}
return nil
}

TODO: 待完善...